<!DOCTYPE html>



  


<html class="theme-next muse use-motion" lang="zh-Hans">
<head>
  <meta charset="UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"/>
<meta name="theme-color" content="#222">









<meta http-equiv="Cache-Control" content="no-transform" />
<meta http-equiv="Cache-Control" content="no-siteapp" />
















  
  
  <link href="/lib/fancybox/source/jquery.fancybox.css?v=2.1.5" rel="stylesheet" type="text/css" />







<link href="/lib/font-awesome/css/font-awesome.min.css?v=4.6.2" rel="stylesheet" type="text/css" />

<link href="/css/main.css?v=5.1.4" rel="stylesheet" type="text/css" />


  <link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="32x32" href="/images/favicon-32x32-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="16x16" href="/images/favicon-16x16-next.png?v=5.1.4">


  <link rel="mask-icon" href="/images/logo.svg?v=5.1.4" color="#222">





  <meta name="keywords" content="Hexo, NexT" />










<meta name="description" content="1、背景当消息出现大量挤压后，消费端将其代码优化后，重启消费端服务器，从 rocketmq-console 上发现 TPS 为 0，如图所示：乍一看，第一时间得出应用还未恢复，就开始去查看相关的启动日志,通常查看的是应用服务器的 &#x2F;home&#x2F;baseuser&#x2F;logs&#x2F;rockemqlogs&#x2F;rocketmq_client.logs，碰巧又看到如下的错误日志： 1RebalanceService">
<meta property="og:type" content="article">
<meta property="og:title" content="踩坑记：rocketmq-console 消费TPS为0，但消息积压数却在降低是个什么“鬼”">
<meta property="og:url" content="https://www.codingw.net/posts/9b5ea02.html">
<meta property="og:site_name" content="中间件兴趣圈">
<meta property="og:description" content="1、背景当消息出现大量挤压后，消费端将其代码优化后，重启消费端服务器，从 rocketmq-console 上发现 TPS 为 0，如图所示：乍一看，第一时间得出应用还未恢复，就开始去查看相关的启动日志,通常查看的是应用服务器的 &#x2F;home&#x2F;baseuser&#x2F;logs&#x2F;rockemqlogs&#x2F;rocketmq_client.logs，碰巧又看到如下的错误日志： 1RebalanceService">
<meta property="og:locale">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191130162947883.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191130163430404.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191130163718134.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="article:published_time" content="2020-10-11T15:02:57.000Z">
<meta property="article:modified_time" content="2021-04-26T13:20:52.267Z">
<meta property="article:author" content="中间件兴趣圈">
<meta property="article:tag" content="中间件">
<meta name="twitter:card" content="summary">
<meta name="twitter:image" content="https://img-blog.csdnimg.cn/20191130162947883.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">



<script type="text/javascript" id="hexo.configurations">
  var NexT = window.NexT || {};
  var CONFIG = {
    root: '',
    scheme: 'Muse',
    version: '5.1.4',
    sidebar: {"position":"left","display":"post","offset":12,"b2t":false,"scrollpercent":false,"onmobile":false},
    fancybox: true,
    tabs: true,
    motion: {"enable":true,"async":false,"transition":{"post_block":"fadeIn","post_header":"slideDownIn","post_body":"slideDownIn","coll_header":"slideLeftIn","sidebar":"slideUpIn"}},
    duoshuo: {
      userId: '0',
      author: '博主'
    },
    algolia: {
      applicationID: '',
      apiKey: '',
      indexName: '',
      hits: {"per_page":10},
      labels: {"input_placeholder":"Search for Posts","hits_empty":"We didn't find any results for the search: ${query}","hits_stats":"${hits} results found in ${time} ms"}
    }
  };
</script>



  <link rel="canonical" href="https://www.codingw.net/posts/9b5ea02.html"/>





  <title>踩坑记：rocketmq-console 消费TPS为0，但消息积压数却在降低是个什么“鬼” | 中间件兴趣圈</title>
  








<meta name="generator" content="Hexo 5.4.0"></head>

<body itemscope itemtype="http://schema.org/WebPage" lang="zh-Hans">

  
  
    
  

  <div class="container sidebar-position-left page-post-detail">
    <div class="headband"></div>

    <header id="header" class="header" itemscope itemtype="http://schema.org/WPHeader">
      <div class="header-inner"><div class="site-brand-wrapper">
  <div class="site-meta ">
    

    <div class="custom-logo-site-title">
      <a href="/"  class="brand" rel="start">
        <span class="logo-line-before"><i></i></span>
        <span class="site-title">中间件兴趣圈</span>
        <span class="logo-line-after"><i></i></span>
      </a>
    </div>
      
        <p class="site-subtitle">微信搜『中间件兴趣圈』，回复『Java』获取200本优质电子书</p>
      
  </div>

  <div class="site-nav-toggle">
    <button>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
    </button>
  </div>
</div>

<nav class="site-nav">
  

  
    <ul id="menu" class="menu">
      
        
        <li class="menu-item menu-item-home">
          <a href="/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-home"></i> <br />
            
            首页
          </a>
        </li>
      
        
        <li class="menu-item menu-item-categories">
          <a href="/categories/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            分类
          </a>
        </li>
      
        
        <li class="menu-item menu-item-archives">
          <a href="/archives/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            归档
          </a>
        </li>
      

      
    </ul>
  

  
</nav>



 </div>
    </header>

    <main id="main" class="main">
      <div class="main-inner">
        <div class="content-wrap">
          <div id="content" class="content">
            

  <div id="posts" class="posts-expand">
    

  

  
  
  

  <article class="post post-type-normal" itemscope itemtype="http://schema.org/Article">
  
  
  
  <div class="post-block">
    <link itemprop="mainEntityOfPage" href="https://www.codingw.net/posts/9b5ea02.html">

    <span hidden itemprop="author" itemscope itemtype="http://schema.org/Person">
      <meta itemprop="name" content="">
      <meta itemprop="description" content="">
      <meta itemprop="image" content="/images/avatar.gif">
    </span>

    <span hidden itemprop="publisher" itemscope itemtype="http://schema.org/Organization">
      <meta itemprop="name" content="中间件兴趣圈">
    </span>

    
      <header class="post-header">

        
        
          <h1 class="post-title" itemprop="name headline">踩坑记：rocketmq-console 消费TPS为0，但消息积压数却在降低是个什么“鬼”</h1>
        

        <div class="post-meta">
          <span class="post-time">
            
              <span class="post-meta-item-icon">
                <i class="fa fa-calendar-o"></i>
              </span>
              
                <span class="post-meta-item-text">发表于</span>
              
              <time title="创建于" itemprop="dateCreated datePublished" datetime="2020-10-11T23:02:57+08:00">
                2020-10-11
              </time>
            

            

            
          </span>

          
            <span class="post-category" >
            
              <span class="post-meta-divider">|</span>
            
              <span class="post-meta-item-icon">
                <i class="fa fa-folder-o"></i>
              </span>
              
                <span class="post-meta-item-text">分类于</span>
              
              
                <span itemprop="about" itemscope itemtype="http://schema.org/Thing">
                  <a href="/categories/rocketmq/" itemprop="url" rel="index">
                    <span itemprop="name">rocketmq</span>
                  </a>
                </span>

                
                
              
            </span>
          

          
            
          

          
          
             <span id="/posts/9b5ea02.html" class="leancloud_visitors" data-flag-title="踩坑记：rocketmq-console 消费TPS为0，但消息积压数却在降低是个什么“鬼”">
               <span class="post-meta-divider">|</span>
               <span class="post-meta-item-icon">
                 <i class="fa fa-eye"></i>
               </span>
               
                 <span class="post-meta-item-text">阅读次数&#58;</span>
               
                 <span class="leancloud-visitors-count"></span>
             </span>
          

          
            <span class="post-meta-divider">|</span>
            <span class="page-pv"><i class="fa fa-file-o"></i>
            <span class="busuanzi-value" id="busuanzi_value_page_pv" ></span>次
            </span>
          

          

          

        </div>
      </header>
    

    
    
    
    <div class="post-body" itemprop="articleBody">

      
      

      
        <div id="vip-container"><h2 id="1、背景"><a href="#1、背景" class="headerlink" title="1、背景"></a>1、背景</h2><p>当消息出现大量挤压后，消费端将其代码优化后，重启消费端服务器，从 rocketmq-console 上发现 TPS 为 0，如图所示：<br><img src="https://img-blog.csdnimg.cn/20191130162947883.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>乍一看，第一时间得出应用还未恢复，就开始去查看相关的启动日志,通常查看的是应用服务器的 /home/baseuser/logs/rockemqlogs/rocketmq_client.logs，碰巧又看到如下的错误日志：</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">RebalanceService - [BUG] ConsumerGroup: consumergourp-1 The consumerId: consumer-client-id-clusterA-192.168.x.x@21932 not in cidAll: [consumer-client-id-clusterA-192.168.x.x@22164]</span><br></pre></td></tr></table></figure>
<span id="more"></span>

<p>上面的日志显示在队列负载时候，当前节点竟然不属于 consumergourp-1 消费组的活跃连接，导致一大片的报错：</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br></pre></td><td class="code"><pre><span class="line">2019-11-02 19:29:17 WARN NettyClientPublicExecutor_1 - execute the pull request exception</span><br><span class="line">org.apache.rocketmq.client.exception.MQBrokerException: CODE: 25  DESC: the consumer&#39;s subscription not latest</span><br><span class="line">For more information, please visit the url, http:&#x2F;&#x2F;rocketmq.apache.org&#x2F;docs&#x2F;faq&#x2F;</span><br><span class="line">	at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:639)</span><br><span class="line">	at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:156)</span><br><span class="line">	at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:592)</span><br><span class="line">	at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:51)</span><br><span class="line">	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:275)</span><br><span class="line">	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)</span><br><span class="line">	at java.util.concurrent.FutureTask.run(FutureTask.java:266)</span><br><span class="line">	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)</span><br><span class="line">	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)</span><br><span class="line">	at java.lang.Thread.run(Thread.java:745)</span><br></pre></td></tr></table></figure>
<p>乍一看确实是 rocketmq 相关的问题，导致上述 消费TPS 为0，经过半个小时的日志分析，发现这是RocketMQ 这是一种正常现象，最终会自动恢复，这里我留一个<strong>伏笔</strong>，将在我的<strong>知识星球</strong>中与广大星友讨论，<strong>经过日志分析得出 rocketmq 没问题，故后面去查看消息积压，发现消息积压明显在减少，那这就奇了怪了，咋消息积压在快速减少，但为啥消费TPS还是为0呢？</strong></p>
<p>接下来将该问题进行探讨。</p>
<blockquote>
<p>温馨提示：在问题分析部分，作者没有直接给出答案，而是一步一步探寻答案，因此会通过追踪源码来寻求答案，如果大家想急于答案，可以跳过问题分析，直接查看本文末尾的问题解答部分。<br>通过本文的阅读，您将获得如下信息：<br>1、RocketMQ 消费TPS的收集与计算逻辑。<br>2、RocketMQ 监控指标的设计思路。<br>3、RocketMQ 主从同步，消费者从主服务器拉取还是从从服务器拉取的判断逻辑。</p>
</blockquote>
<h2 id="2、问题分析"><a href="#2、问题分析" class="headerlink" title="2、问题分析"></a>2、问题分析</h2><h4 id="2-1-rocketmq-console-数据获获取逻辑探讨"><a href="#2-1-rocketmq-console-数据获获取逻辑探讨" class="headerlink" title="2.1 rocketmq-console 数据获获取逻辑探讨"></a>2.1 rocketmq-console 数据获获取逻辑探讨</h4><p>要解开消费TPS 显示为０的问题，我们首先要来看一下 rocketmq-console 这个页面的展示逻辑，即通过阅读 rocketmq-console的源码来解开其采集逻辑。<br><img src="https://img-blog.csdnimg.cn/20191130163430404.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>得知，【消费者】界面查询各个消费组的基本信息的接口为 /consumer/groupList.query，那接下来，我们首先从源码的角度来分析该接口的实现逻辑。其入口如下：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br></pre></td><td class="code"><pre><span class="line">org.apache.rocketmq.console.controller.ConsumerController#list</span><br><span class="line"><span class="meta">@RequestMapping(value = &quot;/groupList.query&quot;)</span></span><br><span class="line"><span class="meta">@ResponseBody</span></span><br><span class="line"><span class="function"><span class="keyword">public</span> Object <span class="title">list</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    <span class="keyword">return</span> consumerService.queryGroupList();</span><br><span class="line">&#125;</span><br><span class="line">就是调用消费服务处理类的 queryGroupList 方法，其实现代码如下：</span><br><span class="line">ConsumerServiceImpl＃queryGroupList</span><br><span class="line"><span class="function"><span class="keyword">public</span> List&lt;GroupConsumeInfo&gt; <span class="title">queryGroupList</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    Set&lt;String&gt; consumerGroupSet = Sets.newHashSet();</span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();　　<span class="comment">// @1</span></span><br><span class="line">        <span class="keyword">for</span> (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) &#123;   <span class="comment">// @2</span></span><br><span class="line">            SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), <span class="number">3000L</span>);  <span class="comment">// @3</span></span><br><span class="line">            consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());                                                                 </span><br><span class="line">        &#125;</span><br><span class="line">    &#125; <span class="keyword">catch</span> (Exception err) &#123;</span><br><span class="line">        <span class="keyword">throw</span> Throwables.propagate(err);</span><br><span class="line">    &#125;</span><br><span class="line">    List&lt;GroupConsumeInfo&gt; groupConsumeInfoList = Lists.newArrayList();</span><br><span class="line">    <span class="keyword">for</span> (String consumerGroup : consumerGroupSet) &#123;                                                <span class="comment">// @4</span></span><br><span class="line">        groupConsumeInfoList.add(queryGroup(consumerGroup));                              </span><br><span class="line">    &#125;</span><br><span class="line">    Collections.sort(groupConsumeInfoList);</span><br><span class="line">    <span class="keyword">return</span> groupConsumeInfoList;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：获取集群的 broker 信息，主要是通过向 NameServer 发送 GET_BROKER_CLUSTER_INFO 请求，NameServer 返回集群包含的所有 broker 信息，包含从节点的信息，返回的格式如下：</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br></pre></td><td class="code"><pre><span class="line">&quot;clusterInfo&quot;: &#123;</span><br><span class="line">    &quot;brokerAddrTable&quot;: &#123;</span><br><span class="line">	   &quot;broker-a&quot;: &#123;</span><br><span class="line">	       &quot;cluster&quot;: &quot;DefaultCluster&quot;,</span><br><span class="line">			&quot;brokerName&quot;: &quot;broker-a&quot;,</span><br><span class="line">			&quot;brokerAddrs&quot;: &#123;</span><br><span class="line">				&quot;0&quot;: &quot;192.168.0.168:10911&quot;,</span><br><span class="line">				&quot;1&quot;: &quot;192.168.0.169:10911&quot;</span><br><span class="line">			&#125;</span><br><span class="line">		&#125;,</span><br><span class="line">        &quot;broker-b&quot;: &#123;</span><br><span class="line">	       &quot;cluster&quot;: &quot;DefaultCluster&quot;,</span><br><span class="line">			&quot;brokerName&quot;: &quot;broker-b&quot;,</span><br><span class="line">			&quot;brokerAddrs&quot;: &#123;</span><br><span class="line">				&quot;0&quot;: &quot;192.168.0.170:10911&quot;,</span><br><span class="line">				&quot;1&quot;: &quot;192.168.1.171:10911&quot;</span><br><span class="line">			&#125;</span><br><span class="line">		&#125;</span><br><span class="line">	&#125;,</span><br><span class="line">	&quot;clusterAddrTable&quot;: &#123;</span><br><span class="line">		&quot;DefaultCluster&quot;: [&quot;broker-a&quot;,&quot;broker-b&quot;]</span><br><span class="line">	&#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@2：遍历集群中的 brokerAddrTable 数据结构，即存储了 broker 的地址信息的 Map 。</p>
<p>代码@3：分别向集群中的主节点(brokerData.selectBrokerAddr()) 获取所有的订阅关系（即消费组的订阅信息）。然后将所有的消费者组名称存入 consumerGroupSet。</p>
<p>代码@4：遍历代码@3收集到的消费组，调用 queryGroup 依次请求消费组的运行时信息，后面接下来详细分析。</p>
<p>接下来将重点分析 queryGroup方法的实现细节。</p>
<p>ConsumerServiceImpl#queryGroup</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> GroupConsumeInfo <span class="title">queryGroup</span><span class="params">(String consumerGroup)</span> </span>&#123;</span><br><span class="line">    GroupConsumeInfo groupConsumeInfo = <span class="keyword">new</span> GroupConsumeInfo();</span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        ConsumeStats consumeStats = <span class="keyword">null</span>;</span><br><span class="line">        <span class="keyword">try</span> &#123;</span><br><span class="line">            consumeStats = mqAdminExt.examineConsumeStats(consumerGroup);  <span class="comment">// @1</span></span><br><span class="line">        &#125; <span class="keyword">catch</span> (Exception e) &#123;</span><br><span class="line">            logger.warn(<span class="string">&quot;examineConsumeStats exception, &quot;</span> + consumerGroup, e);</span><br><span class="line">        &#125;</span><br><span class="line">        ConsumerConnection consumerConnection = <span class="keyword">null</span>;</span><br><span class="line">        <span class="keyword">try</span> &#123;</span><br><span class="line">            consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); </span><br><span class="line">        &#125; <span class="keyword">catch</span> (Exception e) &#123;</span><br><span class="line">            logger.warn(<span class="string">&quot;examineConsumerConnectionInfo exception, &quot;</span> + consumerGroup, e);</span><br><span class="line">        &#125;</span><br><span class="line">        groupConsumeInfo.setGroup(consumerGroup);</span><br><span class="line"></span><br><span class="line">        <span class="keyword">if</span> (consumeStats != <span class="keyword">null</span>) &#123;</span><br><span class="line">            groupConsumeInfo.setConsumeTps((<span class="keyword">int</span>)consumeStats.getConsumeTps());    <span class="comment">// @2</span></span><br><span class="line">            groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());                   <span class="comment">// @3</span></span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">        <span class="keyword">if</span> (consumerConnection != <span class="keyword">null</span>) &#123;</span><br><span class="line">            groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size());</span><br><span class="line">            groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel());</span><br><span class="line">            groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType());</span><br><span class="line">            groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion()));</span><br><span class="line">        &#125;</span><br><span class="line">    &#125; <span class="keyword">catch</span> (Exception e) &#123;</span><br><span class="line">        logger.warn(<span class="string">&quot;examineConsumeStats or examineConsumerConnectionInfo exception, &quot;</span></span><br><span class="line">                + consumerGroup, e);</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> groupConsumeInfo;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>从上面@1，@2，@3这三处代码可以得知，rocketmq-console 相关界面上的消费TPS主要来自 examineConsumeStats 方法，该方法我就不再继续深入，我们只需找到该方法向 broker 发送的请求编码，然后根据该请求编码找到 broker 的处理逻辑即可，最后跟踪发送的请求编码为：RequestCode.GET_CONSUME_STATS。</p>
<p>GET_CONSUME_STATS 命令在 broker 的处理逻辑如下：</p>
<p>AdminBrokerProcessor#getConsumeStats</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br><span class="line">57</span><br><span class="line">58</span><br><span class="line">59</span><br><span class="line">60</span><br><span class="line">61</span><br><span class="line">62</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> RemotingCommand <span class="title">getConsumeStats</span><span class="params">(ChannelHandlerContext ctx, RemotingCommand request)</span> <span class="keyword">throws</span> RemotingCommandException </span>&#123;</span><br><span class="line">        <span class="keyword">final</span> RemotingCommand response = RemotingCommand.createResponseCommand(<span class="keyword">null</span>);</span><br><span class="line">        <span class="keyword">final</span> GetConsumeStatsRequestHeader requestHeader =</span><br><span class="line">            (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);</span><br><span class="line">        ConsumeStats consumeStats = <span class="keyword">new</span> ConsumeStats();</span><br><span class="line">        Set&lt;String&gt; topics = <span class="keyword">new</span> HashSet&lt;String&gt;();</span><br><span class="line">        <span class="keyword">if</span> (UtilAll.isBlank(requestHeader.getTopic())) &#123;</span><br><span class="line">            topics = <span class="keyword">this</span>.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());</span><br><span class="line">        &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">            topics.add(requestHeader.getTopic());</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">for</span> (String topic : topics) &#123;   <span class="comment">// @1</span></span><br><span class="line">            TopicConfig topicConfig = <span class="keyword">this</span>.brokerController.getTopicConfigManager().selectTopicConfig(topic);</span><br><span class="line">            <span class="keyword">if</span> (<span class="keyword">null</span> == topicConfig) &#123;  <span class="comment">// @2</span></span><br><span class="line">                log.warn(<span class="string">&quot;consumeStats, topic config not exist, &#123;&#125;&quot;</span>, topic);</span><br><span class="line">                <span class="keyword">continue</span>;</span><br><span class="line">            &#125;</span><br><span class="line">            &#123;                                </span><br><span class="line">                SubscriptionData findSubscriptionData =</span><br><span class="line">                    <span class="keyword">this</span>.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);   <span class="comment">// @3</span></span><br><span class="line">                <span class="keyword">if</span> (<span class="keyword">null</span> == findSubscriptionData <span class="comment">//</span></span><br><span class="line">                    &amp;&amp; <span class="keyword">this</span>.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) &gt; <span class="number">0</span>) &#123;</span><br><span class="line">                    log.warn(<span class="string">&quot;consumeStats, the consumer group[&#123;&#125;], topic[&#123;&#125;] not exist&quot;</span>, requestHeader.getConsumerGroup(), topic);</span><br><span class="line">                    <span class="keyword">continue</span>;</span><br><span class="line">                &#125;</span><br><span class="line">            &#125;</span><br><span class="line">            <span class="keyword">for</span> (<span class="keyword">int</span> i = <span class="number">0</span>; i &lt; topicConfig.getReadQueueNums(); i++) &#123;   <span class="comment">// @4</span></span><br><span class="line">                MessageQueue mq = <span class="keyword">new</span> MessageQueue();</span><br><span class="line">                mq.setTopic(topic);</span><br><span class="line">                mq.setBrokerName(<span class="keyword">this</span>.brokerController.getBrokerConfig().getBrokerName());</span><br><span class="line">                mq.setQueueId(i);</span><br><span class="line">                OffsetWrapper offsetWrapper = <span class="keyword">new</span> OffsetWrapper();</span><br><span class="line">                <span class="keyword">long</span> brokerOffset = <span class="keyword">this</span>.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);</span><br><span class="line">                <span class="keyword">if</span> (brokerOffset &lt; <span class="number">0</span>)</span><br><span class="line">                    brokerOffset = <span class="number">0</span>;</span><br><span class="line">                <span class="keyword">long</span> consumerOffset = <span class="keyword">this</span>.brokerController.getConsumerOffsetManager().queryOffset(<span class="comment">//</span></span><br><span class="line">                    requestHeader.getConsumerGroup(), <span class="comment">//</span></span><br><span class="line">                    topic, <span class="comment">//</span></span><br><span class="line">                    i);</span><br><span class="line">                <span class="keyword">if</span> (consumerOffset &lt; <span class="number">0</span>)</span><br><span class="line">                    consumerOffset = <span class="number">0</span>;</span><br><span class="line">                offsetWrapper.setBrokerOffset(brokerOffset);                                   <span class="comment">// @5</span></span><br><span class="line">                offsetWrapper.setConsumerOffset(consumerOffset);                       <span class="comment">// @6</span></span><br><span class="line">                <span class="keyword">long</span> timeOffset = consumerOffset - <span class="number">1</span>;</span><br><span class="line">                <span class="keyword">if</span> (timeOffset &gt;= <span class="number">0</span>) &#123;</span><br><span class="line">                    <span class="keyword">long</span> lastTimestamp = <span class="keyword">this</span>.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);</span><br><span class="line">                    <span class="keyword">if</span> (lastTimestamp &gt; <span class="number">0</span>) &#123;</span><br><span class="line">                        offsetWrapper.setLastTimestamp(lastTimestamp);                 <span class="comment">// @7</span></span><br><span class="line">                    &#125;</span><br><span class="line">                &#125;</span><br><span class="line">                consumeStats.getOffsetTable().put(mq, offsetWrapper);     <span class="comment">// @8</span></span><br><span class="line">            &#125;</span><br><span class="line">            <span class="keyword">double</span> consumeTps = <span class="keyword">this</span>.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(), topic); <span class="comment">// @9</span></span><br><span class="line">            consumeTps += consumeStats.getConsumeTps(); <span class="comment">// @10</span></span><br><span class="line">            consumeStats.setConsumeTps(consumeTps);</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">byte</span>[] body = consumeStats.encode();</span><br><span class="line">        response.setBody(body);</span><br><span class="line">        response.setCode(ResponseCode.SUCCESS);</span><br><span class="line">        response.setRemark(<span class="keyword">null</span>);</span><br><span class="line">        <span class="keyword">return</span> response;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>该方法比较长，重点关注如下关键点：<br>代码@1：遍历该消费组订阅的所有主题。消费TPS将是所有主题消费TPS的总和，其他的信息按主题、队列信息单独存放。</p>
<p>代码@2：如果 topic 的元信息不存在，则跳过该主题。</p>
<p>代码@3：如果消费组的订阅信息不存在，则跳过该订阅关系。</p>
<p>代码@4：收集该主题所有的读队列，以messagequeue为键，OffsetWrapper为值存储在 consumeStats.getOffsetTable() ，见代码@8。</p>
<p>代码@5：设置该队列的最新偏移量。</p>
<p>代码@6：设置该消费组对该队列的消费进度，设置为consumeOffset。</p>
<p>代码@7：lastTimestamp 上一次消费的消息的存储时间，实现逻辑为：取消费组对于队列的消息消费进度 -1 的消息，存储在 broker 的时间，如果对应的消息已过期被删除，则在界面上显示的时间就会为1970-01-01 08:00:00。</p>
<p>代码@9：通过 BrokerStatsManager 的 tpsGroupGetNums 方法从统计数据中获取该消费组针对该队列的消费TPS。</p>
<p>代码@10：累积消费TPS，并最终作为该消费组的总TPS。</p>
<p><strong>上面这个方法非常关键，是返回给前段页面核心的数据组装逻辑，以队列、消费组为纬度给出 brokerOffset、consumeOffset、lastTimestamp。然后将数据返回给前段页面进行展示。</strong></p>
<p>接下将聚焦到消费组消费TPS的统计处理，其入口为 <strong>tpsGroupGetNums</strong> 。</p>
<h4 id="2-2-rocketmq-消费TPS统计实现原理"><a href="#2-2-rocketmq-消费TPS统计实现原理" class="headerlink" title="2.2 rocketmq 消费TPS统计实现原理"></a>2.2 rocketmq 消费TPS统计实现原理</h4><h5 id="2-2-1-消费TPS计算逻辑"><a href="#2-2-1-消费TPS计算逻辑" class="headerlink" title="2.2.1 消费TPS计算逻辑"></a>2.2.1 消费TPS计算逻辑</h5><p>首先我们还是从 tpsGroupGetNums 方法入手，探究一下 tps 的获取逻辑，然后再探究数据的采集原理（这也是 rocketmq 监控相关）。</p>
<p>BrokerStatsManager#tpsGroupGetNums</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">double</span> <span class="title">tpsGroupGetNums</span><span class="params">(<span class="keyword">final</span> String group, <span class="keyword">final</span> String topic)</span> </span>&#123;</span><br><span class="line">    <span class="keyword">final</span> String statsKey = buildStatsKey(topic, group); <span class="comment">// @1</span></span><br><span class="line">    <span class="keyword">return</span> <span class="keyword">this</span>.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps(); <span class="comment">// @2</span></span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：构建统计key，其逻辑为：其键为：topic@consumerGroup，即消息主题@消费组名。</p>
<p>要读懂 代码@2 的代码，先来看一下 rocketmq 监控指标的存储数据结构，如下图所示：<br><img src="https://img-blog.csdnimg.cn/20191130163718134.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>正如上图所示：RocketMQ  使用 HashMap&lt; String, StatusItemSet&gt; 来存储监控收集的数据，其中Key 为监控指标的类型，例如 topic 发送消息数量、topic 发送消息大小、消费组获取消息个数等信息，每一项使用 StatsItemSet 存储，该存储结构内部又维护一个HashMap：ConcurrentMap，key 代表某一个具体的统计目标，例如记录消费组拉取消息的数量监控指标，那其统计的对象即 topic@consumer_group，最终数据的载体是 StatsItem，使用如下几个关键字段来记录统计信息：</p>
<ul>
<li>AtomicLong value = new AtomicLong(0)<br>总数量，统计指标TOPIC_GET_NUMS 指标为例，记录的是消息拉取的总条数，例如一次消息拉取操作获取了32条消息，则该数量增加32。</li>
<li>AtomicLong times = new AtomicLong(0)<br>改变上述 value 的次数，还是以统计指标TOPIC_GET_NUMS 指标为例，记录的是增加 value 的次数。</li>
<li>LinkedList&lt; CallSnapshot&gt; csListMinute<br>一分钟的快照信息，该 List 只会存储6个元素，每10s记录一次调用快照，超过6条，则移除第一条，这个将在下文介绍。</li>
<li>LinkedList&lt; CallSnapshot&gt; csListHour<br>一小时的快照信息，该 List 只会存储6个元素，每10分钟记录一次快照，超过6条，则移除第一条。</li>
<li>LinkedList&lt; CallSnapshot&gt; csListDay<br>一天的快照新，该List 只会存储24个元素，每1小时记录一次快照，超过24条，则移除第一条。</li>
</ul>
<p>了解了上述存储结构后，代码@2，最终其实调用的就是 StatsItemSet 的 getStatsDataInMinute 方法。</p>
<p>StatsItemSet#getStatsDataInMinute</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> StatsSnapshot <span class="title">getStatsDataInMinute</span><span class="params">(<span class="keyword">final</span> String statsKey)</span> </span>&#123;</span><br><span class="line">    StatsItem statsItem = <span class="keyword">this</span>.statsItemTable.get(statsKey);</span><br><span class="line">    <span class="keyword">if</span> (<span class="keyword">null</span> != statsItem) &#123;</span><br><span class="line">        <span class="keyword">return</span> statsItem.getStatsDataInMinute();</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> <span class="keyword">new</span> StatsSnapshot();</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>从代码上最终调用 StatesItem 的 getStatsDataInMinute 方法。</p>
<p>StatesItem#getStatsDataInMinute</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> StatsSnapshot <span class="title">getStatsDataInMinute</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    <span class="keyword">return</span> computeStatsData(<span class="keyword">this</span>.csListMinute);</span><br><span class="line">&#125;</span><br><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">static</span> StatsSnapshot <span class="title">computeStatsData</span><span class="params">(<span class="keyword">final</span> LinkedList&lt;CallSnapshot&gt; csList)</span> </span>&#123;</span><br><span class="line">    StatsSnapshot statsSnapshot = <span class="keyword">new</span> StatsSnapshot();</span><br><span class="line">    <span class="keyword">synchronized</span> (csList) &#123;</span><br><span class="line">        <span class="keyword">double</span> tps = <span class="number">0</span>;</span><br><span class="line">        <span class="keyword">double</span> avgpt = <span class="number">0</span>;</span><br><span class="line">        <span class="keyword">long</span> sum = <span class="number">0</span>;</span><br><span class="line">        <span class="keyword">if</span> (!csList.isEmpty()) &#123;</span><br><span class="line">            CallSnapshot first = csList.getFirst();   <span class="comment">// @1</span></span><br><span class="line">            CallSnapshot last = csList.getLast();    <span class="comment">// @2</span></span><br><span class="line">            sum = last.getValue() - first.getValue();  <span class="comment">// @3</span></span><br><span class="line">            tps = (sum * <span class="number">1000.0d</span>) / (last.getTimestamp() - first.getTimestamp());   <span class="comment">// @4</span></span><br><span class="line">            <span class="keyword">long</span> timesDiff = last.getTimes() - first.getTimes();</span><br><span class="line">            <span class="keyword">if</span> (timesDiff &gt; <span class="number">0</span>) &#123;                                                                                   <span class="comment">// @5</span></span><br><span class="line">                avgpt = (sum * <span class="number">1.0d</span>) / timesDiff;</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">        statsSnapshot.setSum(sum);</span><br><span class="line">        statsSnapshot.setTps(tps);</span><br><span class="line">        statsSnapshot.setAvgpt(avgpt);                                                          </span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> statsSnapshot;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：首先取快照中的第一条消息。</p>
<p>代码@2：取快照列表中的最后一条消息。</p>
<p>代码@3：计算这两个时间点 value 的差值，即这段时间内新增的总数。</p>
<p>代码@4：计算这段时间内的tps，即每秒处理的消息条数。</p>
<p>代码@5：计算 avgpt ，即平均一次操作新增的消息条数（即平均一次操作，value 新增的个数）。</p>
<p><strong>消费组的消费TPS的计算逻辑就介绍到这里了，那还有一个疑问，即 StatsItem 中 csListMinute 中的数据从哪来呢？</strong></p>
<h5 id="2-2-2-如何采集消费TPS原始数据"><a href="#2-2-2-如何采集消费TPS原始数据" class="headerlink" title="2.2.2 如何采集消费TPS原始数据"></a>2.2.2 如何采集消费TPS原始数据</h5><p>StatsItem#init</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">init</span><span class="params">()</span> </span>&#123;</span><br><span class="line">  <span class="keyword">this</span>.scheduledExecutorService.scheduleAtFixedRate(<span class="keyword">new</span> Runnable() &#123;</span><br><span class="line">            <span class="meta">@Override</span></span><br><span class="line">            <span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">run</span><span class="params">()</span> </span>&#123;</span><br><span class="line">                <span class="keyword">try</span> &#123;</span><br><span class="line">                    samplingInSeconds();</span><br><span class="line">                &#125; <span class="keyword">catch</span> (Throwable ignored) &#123;</span><br><span class="line">                &#125;</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;, <span class="number">0</span>, <span class="number">10</span>, TimeUnit.SECONDS);</span><br><span class="line">   <span class="comment">// 省略其他代码</span></span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>原来在创建一个新的StatsItem 的时候，就会启动一个定时任务，每隔 10s 调用 samplingInSeconds 方法进行抽样，那我们简单看一下这个方法：</p>
<p>StatsItem#samplingInSeconds</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">samplingInSeconds</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    <span class="keyword">synchronized</span> (<span class="keyword">this</span>.csListMinute) &#123;</span><br><span class="line">        <span class="keyword">this</span>.csListMinute.add(<span class="keyword">new</span> CallSnapshot(System.currentTimeMillis(), <span class="keyword">this</span>.times.get(), <span class="keyword">this</span>.value</span><br><span class="line">                .get()));</span><br><span class="line">        <span class="keyword">if</span> (<span class="keyword">this</span>.csListMinute.size() &gt; <span class="number">7</span>) &#123;</span><br><span class="line">            <span class="keyword">this</span>.csListMinute.removeFirst();</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>就是将当前StatsItem 中的 value 与 变更次数(time ) 存入封装成 CallSnapshot ，然后存储在快照列表中。这里的关键是times values 这些值在什么情况下会改变呢？ </p>
<p>接着往下看，源码在消息拉取的时候，会将本次拉取的信息加入到统计信息中，其入口为：</p>
<p>PullMessageProcessor#processRequest</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">switch</span> (response.getCode()) &#123;</span><br><span class="line">    <span class="keyword">case</span> ResponseCode.SUCCESS:</span><br><span class="line">        <span class="keyword">this</span>.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),</span><br><span class="line">                        getMessageResult.getMessageCount());</span><br><span class="line">        <span class="keyword">this</span>.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),</span><br><span class="line">                        getMessageResult.getBufferTotalSize());</span><br><span class="line">        <span class="keyword">this</span>.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());</span><br><span class="line">        </span><br><span class="line">    <span class="comment">// 省略其他代码</span></span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>该方法会最终更新 StatsItem 中的 values ，而 times 是 每调用一次，加1。</p>
<p>理论基础讲解完毕后，接下来我们来回答一下题目中的现象。</p>
<h2 id="3、问题解答"><a href="#3、问题解答" class="headerlink" title="3、问题解答"></a>3、问题解答</h2><p>按照上面的讲解，通过 rocketmq-console 发起查看消费组的TPS时，Broker 会根据过去一分钟内采集的快照数据进行计算。快照信息的采集机制是 broker 端会每10s 会记录一下消费组对应的拉取消息数量与拉取次数。</p>
<p><strong>那既然消息延迟(堆积数量在不断减少)，说明消费端正在消费，按道理来说，通过上述机制进行计算，TPS 不可能会是0？那又是什么原因呢？</strong></p>
<p><strong>如果TPS为0，可以说明消费端并没有向 broker 拉取消息，因为一旦从 broker 拉取消息，有关 StatsItem 的 拉取消息总数(value) 与 拉取次数(times) 再两次采集国产中肯定不会相等，只要两者有差距，其TPS就不可能为0，那消费组在消费消息，但又不从主节点上拉取消息，这种情况会出现吗？</strong></p>
<p><strong>答案是会的，在 RocketMQ 主从同步架构中，如果需要访问的消息偏移量与当前 commitlog 最大偏移的之间的差距超过了内存的40%，消息消费将由从节点接管，故此时消费的拉取不会去主节点拉取，故上面返回的TPS就会为0。这样就能完美解答了。</strong></p>
<p>经过上面的分析，我相信大家已经非常认可这个原因了，其实我们还有一个重要的论据，大家可以分别去查看 Rocketmq 主从节点 /home/{username}/logs/rocketmqlogs/stats.log，里面会每隔1分钟在日志中打印各个消费组的消费TPS.</p>
<p>从服务器(rocketmq-slave)对应的日志如下：</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br></pre></td><td class="code"><pre><span class="line">INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 785717 TPS: 15714.34 AVGPT: 8.14</span><br><span class="line">INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 940522 TPS: 15675.37 AVGPT: 8.06</span><br></pre></td></tr></table></figure>
<p>主服务器(rocketmq-master)对应的日志如下：</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br></pre></td><td class="code"><pre><span class="line">INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 0 TPS: 0.00 AVGPT: 0.00</span><br><span class="line">INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 0 TPS: 0.00 AVGPT: 0.00</span><br></pre></td></tr></table></figure>
<p><strong>主服务器上的TPS一定会0吗？不一定，其实也不一定。这里借着这波日志，再来总结一下 RocketMQ 主从同步时的切换逻辑。</strong></p>
<p>1、如果消费端请求的消息物理偏移量与 broker 当前最新的物理偏移量之间的差距查过内存的40%，下一次拉取会往从节点发送（当然前提是slaveReadEnable = true）。</p>
<p>2、当从节点开始接管消息消费时，下一次拉取请求一定会往从节点发送码？答案也是不一定：</p>
<ul>
<li>如果待拉取的消息偏移量与从节点最新的物理偏移量之间的差距超过内存的30%，下一次拉取请求还是会发往从节点。</li>
<li>如果待拉取的消息偏移量与从节点最新的物理偏移量之际的差距少于内存的30%，下一次拉取请求将发送到主节点。</li>
</ul>
</div>

			<script src="https://my.openwrite.cn/js/readmore.js" type="text/javascript"></script>
			<script>
			var isMobile = navigator.userAgent.match(/(phone|pad|pod|iPhone|iPod|ios|iPad|Android|Mobile|BlackBerry|IEMobile|MQQBrowser|JUC|Fennec|wOSBrowser|BrowserNG|WebOS|Symbian|Windows Phone)/i);
			if (!isMobile) {
			    var btw = new BTWPlugin();
			    btw.init({
			        "id": "vip-container",
			        "blogId": "18019-1573088808868-542",
			        "name": "中间件兴趣圈",
			        "qrcode": "https://img-blog.csdnimg.cn/20190314214003962.jpg",
			        "keyword": "more"
			    });
			}
			</script>
		
      
    </div>
    
    
    

    

    

    

    <footer class="post-footer">
      

      
      
      

      
        <div class="post-nav">
          <div class="post-nav-next post-nav-item">
            
              <a href="/posts/54929501.html" rel="next" title="RocketMQ一行代码造成大量消息发送失败">
                <i class="fa fa-chevron-left"></i> RocketMQ一行代码造成大量消息发送失败
              </a>
            
          </div>

          <span class="post-nav-divider"></span>

          <div class="post-nav-prev post-nav-item">
            
              <a href="/posts/3e30e205.html" rel="prev" title="源码分析Mybatis插件(Plugin)机制与实战">
                源码分析Mybatis插件(Plugin)机制与实战 <i class="fa fa-chevron-right"></i>
              </a>
            
          </div>
        </div>
      

      
      
    </footer>
  </div>
  
  
  
  </article>



    <div class="post-spread">
      
    </div>
  </div>


          </div>
          


          

  



        </div>
        
          
  
  <div class="sidebar-toggle">
    <div class="sidebar-toggle-line-wrap">
      <span class="sidebar-toggle-line sidebar-toggle-line-first"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-middle"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-last"></span>
    </div>
  </div>

  <aside id="sidebar" class="sidebar">
    
    <div class="sidebar-inner">

      

      
        <ul class="sidebar-nav motion-element">
          <li class="sidebar-nav-toc sidebar-nav-active" data-target="post-toc-wrap">
            文章目录
          </li>
          <li class="sidebar-nav-overview" data-target="site-overview-wrap">
            站点概览
          </li>
        </ul>
      

      <section class="site-overview-wrap sidebar-panel">
        <div class="site-overview">
          <div class="site-author motion-element" itemprop="author" itemscope itemtype="http://schema.org/Person">
            
              <p class="site-author-name" itemprop="name"></p>
              <p class="site-description motion-element" itemprop="description"></p>
          </div>

          <nav class="site-state motion-element">

            
              <div class="site-state-item site-state-posts">
              
                <a href="/archives/">
              
                  <span class="site-state-item-count">139</span>
                  <span class="site-state-item-name">日志</span>
                </a>
              </div>
            

            
              
              
              <div class="site-state-item site-state-categories">
                <a href="/categories/index.html">
                  <span class="site-state-item-count">18</span>
                  <span class="site-state-item-name">分类</span>
                </a>
              </div>
            

            

          </nav>

          

          

          
          

          
          

          

        </div>
      </section>

      
      <!--noindex-->
        <section class="post-toc-wrap motion-element sidebar-panel sidebar-panel-active">
          <div class="post-toc">

            
              
            

            
              <div class="post-toc-content"><ol class="nav"><li class="nav-item nav-level-2"><a class="nav-link" href="#1%E3%80%81%E8%83%8C%E6%99%AF"><span class="nav-number">1.</span> <span class="nav-text">1、背景</span></a></li><li class="nav-item nav-level-2"><a class="nav-link" href="#2%E3%80%81%E9%97%AE%E9%A2%98%E5%88%86%E6%9E%90"><span class="nav-number">2.</span> <span class="nav-text">2、问题分析</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#2-1-rocketmq-console-%E6%95%B0%E6%8D%AE%E8%8E%B7%E8%8E%B7%E5%8F%96%E9%80%BB%E8%BE%91%E6%8E%A2%E8%AE%A8"><span class="nav-number">2.0.1.</span> <span class="nav-text">2.1 rocketmq-console 数据获获取逻辑探讨</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#2-2-rocketmq-%E6%B6%88%E8%B4%B9TPS%E7%BB%9F%E8%AE%A1%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86"><span class="nav-number">2.0.2.</span> <span class="nav-text">2.2 rocketmq 消费TPS统计实现原理</span></a><ol class="nav-child"><li class="nav-item nav-level-5"><a class="nav-link" href="#2-2-1-%E6%B6%88%E8%B4%B9TPS%E8%AE%A1%E7%AE%97%E9%80%BB%E8%BE%91"><span class="nav-number">2.0.2.1.</span> <span class="nav-text">2.2.1 消费TPS计算逻辑</span></a></li><li class="nav-item nav-level-5"><a class="nav-link" href="#2-2-2-%E5%A6%82%E4%BD%95%E9%87%87%E9%9B%86%E6%B6%88%E8%B4%B9TPS%E5%8E%9F%E5%A7%8B%E6%95%B0%E6%8D%AE"><span class="nav-number">2.0.2.2.</span> <span class="nav-text">2.2.2 如何采集消费TPS原始数据</span></a></li></ol></li></ol></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#3%E3%80%81%E9%97%AE%E9%A2%98%E8%A7%A3%E7%AD%94"><span class="nav-number">3.</span> <span class="nav-text">3、问题解答</span></a></li></ol></div>
            

          </div>
        </section>
      <!--/noindex-->
      

      

    </div>
  </aside>


        
      </div>
    </main>

    <footer id="footer" class="footer">
      <div class="footer-inner">
        <div class="copyright">&copy; <span itemprop="copyrightYear">2021</span>
  <span class="with-love">
    <i class="fa fa-user"></i>
  </span>
  <span class="author" itemprop="copyrightHolder">中间件兴趣圈</span>

  
</div>


  <div class="powered-by">由 <a class="theme-link" target="_blank" href="https://hexo.io">Hexo</a> 强力驱动</div>



  <span class="post-meta-divider">|</span>



  <div class="theme-info">主题 &mdash; <a class="theme-link" target="_blank" href="https://github.com/iissnan/hexo-theme-next">NexT.Muse</a> v5.1.4</div>




        
<div class="busuanzi-count">
  <script async src="https://dn-lbstatics.qbox.me/busuanzi/2.3/busuanzi.pure.mini.js"></script>

  
    <span class="site-uv">
      <i class="fa fa-user"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_uv"></span>
      
    </span>
  

  
    <span class="site-pv">
      <i class="fa fa-eye"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_pv"></span>
      
    </span>
  
</div>








        
      </div>
    </footer>

    
      <div class="back-to-top">
        <i class="fa fa-arrow-up"></i>
        
      </div>
    

    

  </div>

  

<script type="text/javascript">
  if (Object.prototype.toString.call(window.Promise) !== '[object Function]') {
    window.Promise = null;
  }
</script>









  












  
  
    <script type="text/javascript" src="/lib/jquery/index.js?v=2.1.3"></script>
  

  
  
    <script type="text/javascript" src="/lib/fastclick/lib/fastclick.min.js?v=1.0.6"></script>
  

  
  
    <script type="text/javascript" src="/lib/jquery_lazyload/jquery.lazyload.js?v=1.9.7"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.ui.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/fancybox/source/jquery.fancybox.pack.js?v=2.1.5"></script>
  


  


  <script type="text/javascript" src="/js/src/utils.js?v=5.1.4"></script>

  <script type="text/javascript" src="/js/src/motion.js?v=5.1.4"></script>



  
  

  
  <script type="text/javascript" src="/js/src/scrollspy.js?v=5.1.4"></script>
<script type="text/javascript" src="/js/src/post-details.js?v=5.1.4"></script>



  


  <script type="text/javascript" src="/js/src/bootstrap.js?v=5.1.4"></script>



  


  




	





  





  












  





  

  
  <script src="https://cdn1.lncld.net/static/js/av-core-mini-0.6.4.js"></script>
  <script>AV.initialize("NNEhOL0iOcflg8f1U3HUqiCq-gzGzoHsz", "7kSmkbbb3DktmHALlShDsBUF");</script>
  <script>
    function showTime(Counter) {
      var query = new AV.Query(Counter);
      var entries = [];
      var $visitors = $(".leancloud_visitors");

      $visitors.each(function () {
        entries.push( $(this).attr("id").trim() );
      });

      query.containedIn('url', entries);
      query.find()
        .done(function (results) {
          var COUNT_CONTAINER_REF = '.leancloud-visitors-count';

          if (results.length === 0) {
            $visitors.find(COUNT_CONTAINER_REF).text(0);
            return;
          }

          for (var i = 0; i < results.length; i++) {
            var item = results[i];
            var url = item.get('url');
            var time = item.get('time');
            var element = document.getElementById(url);

            $(element).find(COUNT_CONTAINER_REF).text(time);
          }
          for(var i = 0; i < entries.length; i++) {
            var url = entries[i];
            var element = document.getElementById(url);
            var countSpan = $(element).find(COUNT_CONTAINER_REF);
            if( countSpan.text() == '') {
              countSpan.text(0);
            }
          }
        })
        .fail(function (object, error) {
          console.log("Error: " + error.code + " " + error.message);
        });
    }

    function addCount(Counter) {
      var $visitors = $(".leancloud_visitors");
      var url = $visitors.attr('id').trim();
      var title = $visitors.attr('data-flag-title').trim();
      var query = new AV.Query(Counter);

      query.equalTo("url", url);
      query.find({
        success: function(results) {
          if (results.length > 0) {
            var counter = results[0];
            counter.fetchWhenSave(true);
            counter.increment("time");
            counter.save(null, {
              success: function(counter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(counter.get('time'));
              },
              error: function(counter, error) {
                console.log('Failed to save Visitor num, with error message: ' + error.message);
              }
            });
          } else {
            var newcounter = new Counter();
            /* Set ACL */
            var acl = new AV.ACL();
            acl.setPublicReadAccess(true);
            acl.setPublicWriteAccess(true);
            newcounter.setACL(acl);
            /* End Set ACL */
            newcounter.set("title", title);
            newcounter.set("url", url);
            newcounter.set("time", 1);
            newcounter.save(null, {
              success: function(newcounter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(newcounter.get('time'));
              },
              error: function(newcounter, error) {
                console.log('Failed to create');
              }
            });
          }
        },
        error: function(error) {
          console.log('Error:' + error.code + " " + error.message);
        }
      });
    }

    $(function() {
      var Counter = AV.Object.extend("Counter");
      if ($('.leancloud_visitors').length == 1) {
        addCount(Counter);
      } else if ($('.post-title-link').length > 1) {
        showTime(Counter);
      }
    });
  </script>



  

  

  
  

  

  

  

</body>
</html>
